package com.dyrnq.seckill.mq.rabbitmq;


import com.dyrnq.seckill.mq.Producer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import java.util.Objects;

import static com.dyrnq.seckill.mq.rabbitmq.RabbitConstants.*;

@Configuration
@Slf4j
@RequiredArgsConstructor
@ConditionalOnProperty(value = "queue.impl", havingValue = "rabbitmq")
public class RabbitMQConfiguration {

    public static final int MAX_CONCURRENT_CONSUMERS = 5;
    public static final int PREFETCH_COUNT = 1;

    private final RabbitMQProperties rabbitMqProperties;

    private RabbitMQProperties getRabbitMqProperties() {
        return this.rabbitMqProperties;
    }

    @Bean(RABBIT_CONNECTION_FACTORY)
    public ConnectionFactory connectionFactory() {
        String nodes = Objects.requireNonNullElse(getRabbitMqProperties().getNodes(), DEFAULT_LOCALHOST);
        String virtualHost = Objects.requireNonNullElse(getRabbitMqProperties().getVirtualHost(), DEFAULT_VH);
        String userName = Objects.requireNonNullElse(getRabbitMqProperties().getUserName(), DEFAULT_USERNAME_PASSWORD);
        String password = Objects.requireNonNullElse(getRabbitMqProperties().getPassword(), DEFAULT_USERNAME_PASSWORD);
        String connectionName = Objects.requireNonNullElse(getRabbitMqProperties().getConnectionName(), DEFAULT_CONNECTION_NAME);

        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(nodes);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setUsername(userName);
        connectionFactory.setPassword(password);
        connectionFactory.setConnectionNameStrategy(c -> connectionName);
        return connectionFactory;
    }

    @Bean(RABBIT_SEND_TEMPLATE)
    public RabbitTemplate sendRabbitTemplate(@Qualifier(RABBIT_CONNECTION_FACTORY) ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean(RABBIT_LISTENER_CONTAINER_FACTORY)
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(@Qualifier(RABBIT_CONNECTION_FACTORY) ConnectionFactory connectionFactory) {
        log.info("自动配置RabbitListenerContainerFactory");
        SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
        simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
        // 消费者数量
        simpleRabbitListenerContainerFactory.setConcurrentConsumers(5);
        // 最大消费者数量
        simpleRabbitListenerContainerFactory.setMaxConcurrentConsumers(10);
        // 手动ack，当消息n次重试消费端依旧处理失败时，可重新回归队列
        // 重启项目后可再次消费，但是如果未及时处理会导致消息队列中数据越来越多
        // simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        // 自动ack，若是n次重试仍旧消费失败后，消息会丢弃，
        // 虽然会产生数据丢失，但是可以防止消息队列中数据冗余
        simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);

        // 消息转换
        simpleRabbitListenerContainerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
        // 消费者自动启动
        simpleRabbitListenerContainerFactory.setAutoStartup(true);
        // 消费者每次从队列获取的消息数量
        simpleRabbitListenerContainerFactory.setPrefetchCount(1);

        // 重试机制
        // 这种方式不生效
        // simpleRabbitListenerContainerFactory.setRetryTemplate(retryTemplate());
        simpleRabbitListenerContainerFactory.setAdviceChain(
                RetryInterceptorBuilder
                        .stateless()
                        .recoverer(new RejectAndDontRequeueRecoverer())
                        .retryOperations(retryTemplate())
                        .build());

        // true时消费者消费失败，自动重新入队;
        // 重试次数超过上面的设置之后是否丢弃（false不丢弃时需要写相应代码将该消息加入死信队列）
        simpleRabbitListenerContainerFactory.setDefaultRequeueRejected(true);

        return simpleRabbitListenerContainerFactory;

    }

    @Bean(RABBIT_ADMIN)
    public RabbitAdmin rabbitAdmin(@Qualifier(RABBIT_CONNECTION_FACTORY) ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setBackOffPolicy(backOffPolicy());
        retryTemplate.setRetryPolicy(retryPolicy());

        // 设置消费消息过程监听（不是必须）
        retryTemplate.registerListener(new RetryListener() {
            @Override
            public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
                // 执行之前调用 （返回false时会终止执行）
                log.debug("***********open: 开始 count: {}", retryContext.getRetryCount());
                return true;
            }

            @Override
            public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
                // 重试结束或消费成功的时候调用
                log.debug("***********close: 结束: count: {} ", retryContext.getRetryCount());
            }

            @Override
            public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
                //  异常 都会调用
                log.error("***********尝试第{}次重新调用", retryContext.getRetryCount());
            }
        });
        return retryTemplate;
    }

    /**
     * 消费时便重试时间间隔
     *
     * @return
     */
    @Bean
    public ExponentialBackOffPolicy backOffPolicy() {
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(3000);
        backOffPolicy.setMaxInterval(10000);
        return backOffPolicy;
    }

    /**
     * 消费失败重试次数
     *
     * @return
     */
    @Bean
    public SimpleRetryPolicy retryPolicy() {
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        // 消息最大发送次数，包括第一次和重试次数
        retryPolicy.setMaxAttempts(3);
        return retryPolicy;
    }

    @Bean
    public Queue topicQueue() {
        /*
        durable：队列是否可持久化，默认为true。
        exclusive： 队列是否具有排它性，默认为false。
        autoDelete：队列没有任何订阅的消费者时是否自动删除，默认为false。
        */
        return new Queue(Producer.QUEUE_NAME, true, false, false);
    }

    // 交换机持久化且不自动删除
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(getRabbitMqProperties().getExchangeName(), true, false);
    }

    @Bean
    public Binding bindingTopicQueue(Queue topicQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue).to(topicExchange).with(Producer.QUEUE_NAME);
    }

}
